#include "rpcserver/rpc_server.h"
#include "base/atomic.h"
#include "log/log.h"
#include "callback/callback.h"
#include "net/tcp_listener.h"
#include "net/channel_core.h"

namespace weilin {
	RpcServer::RpcServer(const ChannelType channel_type, const string& ip, const uint16_t& port, const bool& is_keep_alive, const uint32_t& ms_timeout, const call_back_shared_ptr& call_back_sp) :
		 m_channel_type(channel_type), m_ip(ip), m_port(port), 
		 m_is_keep_alive(is_keep_alive), m_ms_timeout(ms_timeout),
		 m_call_back_sp(call_back_sp), m_channel_seq(0)
	{
		m_channel_unordered_map.clear();
	}

	RpcServer::~RpcServer()
	{

	}
	bool RpcServer::Init()
	{
		m_call_back_sp->SetAcceptedHandleCb( boost::bind(&RpcServer::acceptedHandleCb, shared_from_this(), _1));
		m_call_back_sp->SetSocketStateHandleCb(boost::bind(&RpcServer::socketStateHandle, shared_from_this(), _1, _2));
		m_tcp_listenr_sp = boost::make_shared<TcpListener>(m_ip, m_port, m_call_back_sp);

		bool ret = m_tcp_listenr_sp->Init();
        return ret;
	}
	bool RpcServer::Start()
	{
		bool ret = m_tcp_listenr_sp->Start();
		return ret;
	}
	bool RpcServer::Stop()
	{
		//close connectionmgr
		removeAllAcceptedChannel();

		bool ret = m_tcp_listenr_sp->Stop();
		return ret;
    }

	void RpcServer::acceptedHandleCb(const socket_shared_ptr& socket_sp)
	{
		//reqeist qu manage;
		AtomicIncrement<uint32_t>(&m_channel_seq);
		channel_core_shared_ptr channel_core_sp = boost::make_shared<weilin::RpcChannelCore>(m_channel_type, m_channel_seq, socket_sp, m_is_keep_alive, m_ms_timeout, m_call_back_sp);
		channel_core_sp->StartAsyncRecv();
		addAcceptedChannel(m_channel_seq, channel_core_sp);
	}

	void RpcServer::socketStateHandle(const channel_core_shared_ptr& channel_core_sp, const SocketStatus& socket_status)
	{
		if (socket_status == SOCKET_COLOSED)
		{
			removeAcceptedChannel(channel_core_sp->GetSeq());
		}
	}

	void RpcServer::addAcceptedChannel(const uint32_t& channel_seq, const channel_core_shared_ptr& channle_core_sp)
	{
		scope_mutex_locker scoper_locker(m_mutex);
		boost::unordered_map<uint32_t, channel_core_shared_ptr>::iterator index_it = m_channel_unordered_map.find(channel_seq);
		if (index_it != m_channel_unordered_map.end())
		{
			LOG(INFO) <<  "m_accepted_channel_map  socket_sp has add error ";
			return;
		}
		LOG(INFO) << "addAcceptedChannel";
		m_channel_unordered_map[channel_seq] = channle_core_sp;
	}

	void RpcServer::removeAcceptedChannel(const uint32_t& channel_seq)
	{
		scope_mutex_locker scoper_locker(m_mutex);

		boost::unordered_map<uint32_t, channel_core_shared_ptr>::iterator index_it = m_channel_unordered_map.find(channel_seq);
		if (index_it == m_channel_unordered_map.end())
		{
			LOG(INFO) << "m_accepted_channel_map  socket_sp has remove error ";
			return;
		}
		LOG(INFO) << "removeAcceptedChannel";
		m_channel_unordered_map.erase(index_it);
	}

	void RpcServer::removeAllAcceptedChannel()
	{
		scope_mutex_locker scoper_locker(m_mutex);
		boost::unordered_map<uint32_t, channel_core_shared_ptr>::iterator index_it = m_channel_unordered_map.begin();
		while (index_it != m_channel_unordered_map.end())
		{
			index_it->second->Stop();
			index_it++;
		}
		m_channel_unordered_map.clear();
	}
}
